/*
 Licensed to the Apache Software Foundation (ASF) under one
 or more contributor license agreements.  See the NOTICE file
 distributed with this work for additional information
 regarding copyright ownership.  The ASF licenses this file
 to you under the Apache License, Version 2.0 (the
 "License"); you may not use this file except in compliance
 with the License.  You may obtain a copy of the License at

     http://www.apache.org/licenses/LICENSE-2.0

 Unless required by applicable law or agreed to in writing,
 software distributed under the License is distributed on an
 "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 KIND, either express or implied.  See the License for the
 specific language governing permissions and limitations
 under the License.
 */
package org.apache.plc4x.java.examples.cloud.google;

import io.jsonwebtoken.JwtBuilder;
import io.jsonwebtoken.Jwts;
import io.jsonwebtoken.SignatureAlgorithm;
import org.apache.plc4x.java.PlcDriverManager;
import org.apache.plc4x.java.api.PlcConnection;
import org.apache.plc4x.java.api.messages.PlcReadRequest;
import org.apache.plc4x.java.api.messages.PlcReadResponse;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.joda.time.DateTime;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.security.KeyFactory;
import java.security.NoSuchAlgorithmException;
import java.security.spec.InvalidKeySpecException;
import java.security.spec.PKCS8EncodedKeySpec;

// [START iot_mqtt_includes]
// [END iot_mqtt_includes]

public class S7PlcToGoogleIoTCoreSample {

    private static final Logger logger = LoggerFactory.getLogger(S7PlcToGoogleIoTCoreSample.class);

    // [START iot_mqtt_jwt]

    /**
     * Create a Cloud IoT Core JWT for the given project id, signed with the given RSA key.
     */
    private static String createJwtRsa(String projectId, String privateKeyFile)
        throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
        DateTime now = new DateTime();
        // Create a JWT to authenticate this device. The device will be disconnected after the token
        // expires, and will have to reconnect with a new token. The audience field should always be set
        // to the GCP project id.
        JwtBuilder jwtBuilder =
            Jwts.builder()
                .setIssuedAt(now.toDate())
                .setExpiration(now.plusMinutes(20).toDate())
                .setAudience(projectId);

        byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
        PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
        KeyFactory kf = KeyFactory.getInstance("RSA");

        return jwtBuilder.signWith(SignatureAlgorithm.RS256, kf.generatePrivate(spec)).compact();
    }

    /**
     * Create a Cloud IoT Core JWT for the given project id, signed with the given ES key.
     */
    private static String createJwtEs(String projectId, String privateKeyFile)
        throws IOException, NoSuchAlgorithmException, InvalidKeySpecException {
        DateTime now = new DateTime();
        // Create a JWT to authenticate this device. The device will be disconnected after the token
        // expires, and will have to reconnect with a new token. The audience field should always be set
        // to the GCP project id.
        JwtBuilder jwtBuilder =
            Jwts.builder()
                .setIssuedAt(now.toDate())
                .setExpiration(now.plusMinutes(20).toDate())
                .setAudience(projectId);

        byte[] keyBytes = Files.readAllBytes(Paths.get(privateKeyFile));
        PKCS8EncodedKeySpec spec = new PKCS8EncodedKeySpec(keyBytes);
        KeyFactory kf = KeyFactory.getInstance("EC");

        return jwtBuilder.signWith(SignatureAlgorithm.ES256, kf.generatePrivate(spec)).compact();
    }
    // [END iot_mqtt_jwt]

    /**
     * Attaches the callback used when configuration changes occur.
     */
    private static void attachCallback(MqttClient client, String deviceId) throws MqttException {
        // [START iot_mqtt_configcallback]
        MqttCallback mCallback = new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                // Do nothing...
            }

            @Override
            public void messageArrived(String topic, MqttMessage message) {
                String payload = new String(message.getPayload());
                System.out.println("Payload : " + payload);
                // TODO: Insert your parsing / handling of the configuration message here.
            }

            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                // Do nothing;
            }
        };

        String configTopic = String.format("/devices/%s/config", deviceId);
        client.subscribe(configTopic, 1);

        client.setCallback(mCallback);
    }
    // [END iot_mqtt_configcallback]


    private static void setConnectPassword(CliOptions options, MqttConnectOptions connectOptions)
        throws NoSuchAlgorithmException, IOException, InvalidKeySpecException {
        switch (options.getAlgorithm()) {
            case "RS256":
                connectOptions.setPassword(
                    createJwtRsa(options.getProjectId(), options.getPrivateKeyFile()).toCharArray());
                break;
            case "ES256":
                connectOptions.setPassword(
                    createJwtEs(options.getProjectId(), options.getPrivateKeyFile()).toCharArray());
                break;
            default:
                throw new IllegalArgumentException(
                    "Invalid algorithm " + options.getAlgorithm()
                        + ". Should be one of 'RS256' or 'ES256'.");
        }
    }

    /**
     * Example code do demonstrate sending events from an S7 device to Microsoft Azure IoT Hub
     *
     * @param args Expected: [plc4x connection string, plc4x address, IoT-Hub connection string].
     */
    public static void main(String[] args) throws Exception {

        // [START iot_mqtt_configuremqtt]
        CliOptions options = CliOptions.fromArgs(args);
        if (options == null) {
            CliOptions.printHelp();
            // Could not parse.
            System.exit(1);
        }

        // Build the connection string for Google's Cloud IoT Core MQTT server. Only SSL
        // connections are accepted. For server authentication, the JVM's root certificates
        // are used.
        final String mqttServerAddress =
            String.format("ssl://%s:%s", options.getMqttBridgeHostname(), options.getMqttBridgePort());

        // Create our MQTT client. The mqttClientId is a unique string that identifies this device. For
        // Google Cloud IoT Core, it must be in the format below.
        final String mqttClientId =
            String.format("projects/%s/locations/%s/registries/%s/devices/%s",
                options.getProjectId(), options.getCloudRegion(), options.getRegistryId(), options.getDeviceId());

        MqttConnectOptions connectOptions = new MqttConnectOptions();
        // Note that the Google Cloud IoT Core only supports MQTT 3.1.1, and Paho requires that we
        // explictly set this. If you don't set MQTT version, the server will immediately close its
        // connection to your device.
        connectOptions.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);

        // With Google Cloud IoT Core, the username field is ignored, however it must be set for the
        // Paho client library to send the password field. The password field is used to transmit a JWT
        // to authorize the device.
        connectOptions.setUserName("unused");

        DateTime iat = new DateTime();
        setConnectPassword(options, connectOptions);
        // [END iot_mqtt_configuremqtt]

        // [START iot_mqtt_publish]
        // Create a client, and connect to the Google MQTT bridge.
        MqttClient client = new MqttClient(mqttServerAddress, mqttClientId, new MemoryPersistence());

        // Both connect and publish operations may fail. If they do, allow retries but with an
        // exponential backoff time period.
        long initialConnectIntervalMillis = 500L;
        long maxConnectIntervalMillis = 6000L;
        long maxConnectRetryTimeElapsedMillis = 900000L;
        float intervalMultiplier = 1.5f;

        long retryIntervalMs = initialConnectIntervalMillis;
        long totalRetryTimeMs = 0;

        while (!client.isConnected() && totalRetryTimeMs < maxConnectRetryTimeElapsedMillis) {
            try {
                client.connect(connectOptions);
            } catch (MqttException e) {
                int reason = e.getReasonCode();

                // If the connection is lost or if the server cannot be connected, allow retries, but with
                // exponential backoff.
                System.out.println("An error occurred: " + e.getMessage());
                if (reason == MqttException.REASON_CODE_CONNECTION_LOST
                    || reason == MqttException.REASON_CODE_SERVER_CONNECT_ERROR) {
                    System.out.println("Retrying in " + retryIntervalMs / 1000.0 + " seconds.");
                    Thread.sleep(retryIntervalMs);
                    totalRetryTimeMs += retryIntervalMs;
                    retryIntervalMs *= intervalMultiplier;
                    if (retryIntervalMs > maxConnectIntervalMillis) {
                        retryIntervalMs = maxConnectIntervalMillis;
                    }
                } else {
                    throw e;
                }
            }
        }

        attachCallback(client, options.getDeviceId());

        // Publish to the events or state topic based on the flag.
        String subTopic = "event".equals(options.getMessageType()) ? "events" : options.getMessageType();

        // The MQTT topic that this device will publish telemetry data to. The MQTT topic name is
        // required to be in the format below. Note that this is not the same as the device registry's
        // Cloud Pub/Sub topic.
        String mqttTopic = String.format("/devices/%s/%s", options.getDeviceId(), subTopic);

        // Connect to Plc
        logger.info("Connecting to Plc");
        try (PlcConnection plcConnection = new PlcDriverManager().getConnection("s7://10.10.64.20/1/1")) {
            logger.info("Connected");

            PlcReadRequest readRequest = plcConnection.readRequestBuilder().addItem("outputs", "OUTPUTS/0").build();

            while (!Thread.currentThread().isInterrupted()) {

                PlcReadResponse plcReadResponse = readRequest.execute().get();

                // Refresh the connection credentials before the JWT expires.
                // [START iot_mqtt_jwt_refresh]
                long secsSinceRefresh = ((new DateTime()).getMillis() - iat.getMillis()) / 1000;
                if (secsSinceRefresh > (options.getTokenExpMins() * 60)) {
                    System.out.format("\tRefreshing token after: %d seconds%n", secsSinceRefresh);
                    iat = new DateTime();
                    setConnectPassword(options, connectOptions);
                    client.disconnect();
                    client.connect();
                    attachCallback(client, options.getDeviceId());
                }
                // [END iot_mqtt_jwt_refresh]

                // Send data to cloud
                for (String fieldName : plcReadResponse.getFieldNames()) {
                    Long l = plcReadResponse.getLong(fieldName);
                    byte[] array = ByteBuffer.allocate(8).putLong(l).array();
                    String result = Long.toBinaryString(l);
                    System.out.println("Outputs: " + result);
                    // Publish "array" to the MQTT topic. qos=1 means at least once delivery. Cloud IoT Core
                    // also supports qos=0 for at most once delivery.
                    MqttMessage message = new MqttMessage(array);
                    message.setQos(1);
                    client.publish(mqttTopic, message);
                    if ("event".equals(options.getMessageType())) {
                        // Send telemetry events every second
                        Thread.sleep(1000);
                    } else {
                        // Note: Update Device state less frequently than with telemetry events
                        Thread.sleep(5000);
                    }
                }
            }
        }

        System.out.println("Sent all messages. Goodbye!");
        // [END iot_mqtt_publish]
    }
}
